package br.unifor.cmi.ha.jgcs;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Iterator;
import java.util.Vector;

import javax.management.ObjectName;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.sql.DataSource;

import org.ow2.cmi.admin.CMIMBeanConfigException;
import org.ow2.cmi.admin.MBeanUtils;
import org.ow2.cmi.controller.factory.ClusterViewManagerFactory;
import org.ow2.cmi.controller.server.ServerClusterViewManager;
import org.ow2.cmi.ha.ActiveExecutionObject;
import org.ow2.cmi.ha.BeanInfo;
import org.ow2.cmi.ha.EntityBeanReference;
import org.ow2.cmi.ha.HaMessageData;
import org.ow2.cmi.ha.MessageManager;
import org.ow2.cmi.ha.ReplicationException;
import org.ow2.cmi.ha.ReplicationManager;
import org.ow2.cmi.ha.RequestId;
import org.ow2.cmi.ha.SessionId;
import org.ow2.cmi.ha.StatefulBeanReference;
import org.ow2.cmi.ha.TOHashTable;
import org.ow2.util.log.Log;
import org.ow2.util.log.LogFactory;


public class JGCSReplicationManagerImpl implements ReplicationManager, JGCSReplicationManagerImplMBean {

    /**
     * Logger.
     */
    private static Log logger = LogFactory.getLog(JGCSReplicationManagerImpl.class);

    /**
     * Default backup info timeout.
     */
    private static final int HA_BACKUPINFO_DEFAULT_TIMEOUT = 600; // 10 minutes

    /**
     * Timeout for the info stored in the class.
     */
    private int timeout;

    /**
     * The tx table datasource String.
     */
    private String txTableDatasource;

    /**
     * The tx table datasource.
     */
    private DataSource ds = null;

    /**
     * The message manager.
     */
    private final MessageManager messageMgr;

    /**
     * Structures for the primary.
     */
    private final TOHashTable<RequestId, ActiveExecutionObject> activeExecutionObjects;

    /**
     * Holds the messages that are not sent.
     */
    private final Vector<HaMessageData> otherMessages = new Vector<HaMessageData>();

    /* Structures for the backup */

    /**
     * Holds the committing messages.
     */
    private final TOHashTable<SessionId, HaMessageData> committingMessages;

    /**
     * Holds the information related to changes in the primary. The key is the
     * ObjectId
     */
    private final TOHashTable<SessionId, BeanInfo> backupBeanInfo;

    /**
     * Holds the response associated with a request in the backup.
     */
    private final TOHashTable<RequestId, Object> backupRequestReponse;

    /**
     * Name of the bean.
     */
    private String name = "HA";

    /**
     * Object name: the name binded in the MBean Server.
     */
    private ObjectName objectName = null;

    /**
     * File containing jgcs properties
     */
    private String jgcsProperties = null;

    public JGCSReplicationManagerImpl(	final int timeout,
							            final String datasource,
							            final String jgcsProperties,
							            final long reconnectionTimeout) throws Exception {
    	this.jgcsProperties = jgcsProperties;
    	
        if(timeout == 0) {
            this.timeout = HA_BACKUPINFO_DEFAULT_TIMEOUT;
            logger.debug("Using defaults backup info timeout");
        } else {
            this.timeout = timeout;
        }
        logger.debug("Backup info timeout: " + this.timeout);

        txTableDatasource = datasource;
        logger.debug("Tx table datasource: " + txTableDatasource);

        messageMgr = new JGCSMessageManager(jgcsProperties, this, reconnectionTimeout);

        // Initialize structures
        activeExecutionObjects = new TOHashTable<RequestId, ActiveExecutionObject>(timeout);
        committingMessages = new TOHashTable<SessionId, HaMessageData>(timeout);
        backupBeanInfo = new TOHashTable<SessionId, BeanInfo>(timeout);
        backupRequestReponse = new TOHashTable<RequestId, Object>(timeout);

        // Register the CMI bean
        logger.debug("Registering HA MBean");
        objectName = MBeanUtils.registerXtraMBean(name, this);

        // Inform that the replication is ready
        ((ServerClusterViewManager) ClusterViewManagerFactory.getFactory()
                .getClusterViewManager()).setReplicationManagerStarted(true);
    }

    /**
     * Adds a modified EJB to the changes made inside a request. The EJB
     * modifications performed under the reqId will be replicated before the
     * response will be returned to the client outside the VM.
     * @param reqId the request id from the client
     * @param clusterOId the ObjectId of the changed bean
     * @param bean the serialized state of the changed bean
     */
    public void addModifiedBean(final RequestId reqId, final SessionId clusterOId, final StatefulBeanReference bean) {
        logger.debug("Adding to request " + reqId + " modified bean " + clusterOId);

        ActiveExecutionObject aeo = activeExecutionObjects.get(reqId);
        if (aeo == null) {
            logger.debug("\tCreating new ActiveExecutionObject");
            aeo = new ActiveExecutionObject(reqId, null, null);
        }
        Vector<BeanInfo> modifiedBeans = aeo.getBeans();
        if (modifiedBeans == null) {
            logger.debug("\tCreating new beans vector");
            modifiedBeans = new Vector<BeanInfo>();
        }
        modifiedBeans.add(new BeanInfo(clusterOId, bean));
        aeo.setBeans(modifiedBeans);
        activeExecutionObjects.put(reqId, aeo);
    }

    /**
     * Correlates the changes made on a modified bean with a concrete client request.
     * @param reqId the request id from the client
     * @param bean the bean reference
     */
    public void addEntityBean(final RequestId reqId, final EntityBeanReference bean) {
        if (logger.isDebugEnabled()) {
            try {
                logger.debug("Adding to request " + reqId + " entity bean: " + bean.getPrimaryKey());
            } catch (Exception e) {
            }
        }

        ActiveExecutionObject aeo = activeExecutionObjects.get(reqId);
        if (aeo == null) {
            logger.debug("\tCreating new ActiveExecutionObject");
            aeo = new ActiveExecutionObject(reqId, null, null);
        }
        Vector<EntityBeanReference> entityBeans = aeo.getEntityBeans();
        if (entityBeans == null) {
            logger.debug("\tCreating new entity beans vector");
            entityBeans = new Vector<EntityBeanReference>();
        }
        entityBeans.add(bean);
        aeo.setEntityBeans(entityBeans);
        activeExecutionObjects.put(reqId, aeo);
    }


    /**
     * Associate a response with a requestId.
     * @param reqId the request id
     * @param response the response
     */
    public void addResponse(final RequestId reqId, final Object response) {
        logger.debug("Adding response to request: " + reqId + " response: " + response);

        ActiveExecutionObject aeo = activeExecutionObjects.get(reqId);
        if (aeo == null) {
            aeo = new ActiveExecutionObject(reqId, null, response);
        } else {
            aeo.setResponse(response);
        }

        activeExecutionObjects.put(reqId, aeo);
    }

    /**
     * Replicates the EJB changes kept on the requestChanges structure for a concrete reqId.
     * @param requestId identifies the request changes to replicate hold in requestChanges structure
     * @throws ReplicationException
     */
    public void replicate(final RequestId requestId) throws ReplicationException {
        logger.debug("Replicating... Getting states from reqId " + requestId);

        ActiveExecutionObject aeo = activeExecutionObjects.get(requestId);
        if (aeo != null) {

            // Obtain the modified beans
            Vector<BeanInfo> aeobeans = aeo.getBeans();
            Vector<BeanInfo> beans = new Vector<BeanInfo>();
            if (aeobeans != null) {

                // Process beans
                for (int i = 0; i < aeobeans.size(); i++) {
                    BeanInfo bean = aeobeans.elementAt(i);
                    StatefulBeanReference br = bean.getBean();
                    logger.debug("\tProcessing bean: " + bean.getBId());
                    if (br != null) {
                       if (br.isModified()) {
                           logger.debug("\tBean modified");

                           // Obtain the bean state
                           bean.obtainBeanState();
                           beans.add(bean);
                       } else {
                           logger.debug("\tBean not modified");
                       }
                    } else {
                        logger.debug("\tBean marked for remove");

                        // The bean is marked for remove
                        beans.add(bean);
                    }
                }
            }

            boolean readOnly = true;

            // Check if the transaction is read only for entity beans
            Vector<EntityBeanReference> entityBeans = aeo.getEntityBeans();
            if (entityBeans != null) {
                if (entityBeans.size() > 0){
                    for (int i = 0; i < entityBeans.size(); i++) {
                        EntityBeanReference ebr = entityBeans.elementAt(i);
                        if (ebr.isModified()) {
                            readOnly = false;
                            break;
                        }
                    }
                } else {
                    readOnly = true;
                }
            } else {
                readOnly = true;
            }

            // piggy back other messages
            Object response = aeo.getResponse();
            Vector<HaMessageData> om = new Vector<HaMessageData>();
            synchronized (otherMessages) {
                if (otherMessages.size() > 0) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("\tPiggy back messages");
                        for (HaMessageData md : otherMessages) {
                            logger.debug("\t\tMessage: " + md);
                        }
                    }

                    om.addAll(otherMessages);
                    otherMessages.clear();
                } else {
                    logger.debug("\tNo messages to Piggy back");
                }
            }

            HaMessageData messageData = HaMessageData.newCommittingMessage(requestId, response,
                                                                            beans, readOnly, om);

            try {
                logger.debug("\tSend message: " + messageData);

                if (!readOnly) {
                    insertTxid(requestId);
                }
                messageMgr.sendMessage(messageData);
                logger.debug("\tDeleting changes from requestChanges table for reqId: " + requestId);
                activeExecutionObjects.remove(requestId);
            } catch (Exception e) {
                logger.error("\tunable to replicate", e);
                throw new ReplicationException();
            }
            logger.debug("\tactiveExecutionObjects T. Size : " + activeExecutionObjects.size());
        } else {
            //On an aborted transaction
            logger.debug("\tNothing to replicate on committing for: " + requestId);
        }
    }

    /* (non-Javadoc)
     * @see org.ow2.cmi.ha.ReplicationManager#replicateCommit(org.ow2.cmi.ha.RequestId)
     */
    public void replicateCommit(final RequestId reqId, final boolean committed) throws ReplicationException {
        logger.debug("Replicate commit: " + reqId + " committed: " + committed);

        HaMessageData messageData = null;
        if(committed) {
            messageData = HaMessageData.newCommitMessage(reqId);
        } else {
            ActiveExecutionObject aeo = activeExecutionObjects.get(reqId);
            Object response = null;
            if (aeo != null) {
                response = aeo.getResponse();
                activeExecutionObjects.remove(reqId);
            }
            //COMPLETE: Should this line be executed if aeo == null. Think about it
            messageData = HaMessageData.newAbortMessage(reqId, response);
        }

        synchronized (otherMessages) {
            logger.debug("Add message to otherMessages: " + messageData);

            otherMessages.add(messageData);
        }
    }


    // Backup node

    /**
     * Returns true if the request id has an associated response, false in other case.
     * @param requestId the request id
     * @return true if the request id has an associated response, false in other case
     */
    public boolean hasBackupResponse(final RequestId requestId) {
        logger.debug("Check response for: " + requestId);

        boolean found = false;
        found = backupRequestReponse.containsKey(requestId);
        // If not found look for it in committing messages
        if (!found) {
            logger.debug("\tResponse not found in backupRequestResponse");

            synchronized (committingMessages) {
//                boolean inCM = committingMessages.containsKey(requestId);
                HaMessageData data = committingMessages.get(requestId.getObjectId());
                if (data != null) {
                    if (requestId.getRequestNo() == data.getRequestId().getRequestNo()) {
                        logger.debug("\tMessage found in commitingMessages");

                        if (isCommitted(requestId)) {
                            logger.debug("\tTransaction found in tx table");

                            commitMessageToBeanInfo(requestId);
                            found = true;
                        } else {
                            logger.debug("\tTransaction not found in tx table");

                            committingMessages.remove(requestId.getObjectId());
                            found = false;
                        }
                    } else {
                        logger.debug("\tMessage found in commitingMessages but with bad request number");
                    }
                }
            }
        } else {
            logger.debug("\tResponse found in backupRequestResponse");
        }

        return found;
    }

    /**
     * Returns the response associated with the request id.
     * Returns null if there is not response associated
     * @param reqId
     * @return the associated response
     */
    public Object getBackupResponse(final RequestId reqId) {
        //COMPLETE: Check if the response must be removed from table
        return backupRequestReponse.get(reqId);
    }

    /**
     * Applies the changes kept in BeanChanges. Changes will be applied to the
     * concrete concrete custerOID specified.
     * @param clusterOID
     * @param bean
     */
    public void restoreBeanChanges(final SessionId clusterOID, final StatefulBeanReference bean) {
            logger.debug("Trying to restore EJB: " + clusterOID);
            logger.debug("\tSearching in backupBeanInfo");
        synchronized (backupBeanInfo) {
            if (backupBeanInfo.containsKey(clusterOID)) {
                logger.debug(clusterOID + "\tBean found!");

                BeanInfo beanInfo = backupBeanInfo.get(clusterOID);
                // Injects the state for the bean
                try {
                    bean.injectState(beanInfo.getState());
                    logger.debug(clusterOID + "\tState injected!");
                } catch (Exception e) {
                    logger.error("\tError injecting the state in: " + clusterOID, e);
                }
                // Remove bean info from table in this node
                backupBeanInfo.remove(clusterOID);
                logger.debug("\t" + clusterOID + " EJB removed from backupBeanInfo!");
            } else {
                logger.debug(clusterOID + "\tBean not found!");
            }
        }
    }

    /**
     * Processes a message data.
     * @param data the message data
     */
    public void processMessage(final HaMessageData data) {
        switch(data.getType()) {
            case COMMITTING_MESSAGE:
                processCommittingMessage(data);
                break;
            case COMMIT_MESSAGE:
                processCommitMessage(data);
                break;
            case ABORT_MESSAGE:
                processAbortMessage(data);
                break;
            default:
                logger.error("Unknown message type: " + data.getType());
        }
    }

    /**
     * Processes a committing message data.
     * @param data the message data
     */
    private void processCommittingMessage(final HaMessageData data) {
        logger.debug("Processing committing message: " + data);

        // Process piggy-backed messages
        Iterator<HaMessageData> it = data.getOtherMessages().iterator();

        if (logger.isDebugEnabled()) {
            if (it.hasNext()) {
                logger.debug("Processing piggy backed messages");
            }
        }

        while (it.hasNext()) {
            HaMessageData hmd = it.next();
            processMessage(hmd);
        }

        // If there are not entity bean modified process the message
        if (data.isReadOnly()) {
            messageToBeanInfo(data);
        } else {
            committingMessages.put(data.getRequestId().getObjectId(), data);
        }
    }

    /**
     * Processes a commit message data.
     * @param data the message data
     */
    private void processCommitMessage(final HaMessageData data) {
        logger.debug("Processing commit message: " + data);

        RequestId requestId = data.getRequestId();
        if (!commitMessageToBeanInfo(requestId)) {
            logger.debug("Request " + requestId + " from commit message don't found in table");
        }
    }

    /**
     * Processes an abort message data.
     * @param data the message data
     */
    private void processAbortMessage(final HaMessageData data) {
        logger.debug("Processing abort message: " + data);

        RequestId requestId = data.getRequestId();
        HaMessageData md = committingMessages.get(requestId.getObjectId());

        if (md != null) {
            // Add the response to requestResponse table
            backupRequestReponse.put(requestId, md.getResponse());

            committingMessages.remove(requestId.getObjectId());
        }
    }

    /**
     * Look for a request id in committing messages. If found the state
     * is stored in backupBeanInfo and backupRequestResponse
     * @param requestId the request id
     * @return true if the request Id have been found in committing messages
     */
    private boolean commitMessageToBeanInfo(final RequestId requestId) {
        logger.debug("Processing from commitingMessages: " + requestId);

        boolean found = false;
        HaMessageData md = committingMessages.get(requestId.getObjectId());
        if (md != null) {
            logger.debug("\tFound in committingMessages");

            found = true;
            messageToBeanInfo(md);
            // Remove message from committing messages
            committingMessages.remove(requestId.getObjectId());
        } else {
            logger.debug("\tNot found in committingMessages");

            found = false;
        }
        return found;
    }

    /**
     * Process a message an include it info the bean info table.
     * @param md the message
     */
    private void messageToBeanInfo(final HaMessageData md) {
        // Add the beans to backupBeanInfo table
        Iterator<BeanInfo> it = md.getBeans().iterator();
        if (logger.isDebugEnabled()) {
            if (it.hasNext()) {
                logger.debug("\tMessage has beans");
            } else {
                logger.debug("\tMessage don't have beans");
            }
        }

        while (it.hasNext()) {
            BeanInfo bi = it.next();
            if (bi.getState() != null) {
                logger.debug("\tBean added to backupBeanInfo: " + bi.getBId());

                backupBeanInfo.put(bi.getBId(), bi);
            } else {
                logger.debug("\tBean removed from backupBeanInfo: " + bi.getBId());

                backupBeanInfo.remove(bi.getBId());
            }
        }
        // Add the response to requestResponse table
        backupRequestReponse.put(md.getRequestId(), md.getResponse());
    }

    /**
     * Inserts the transaction id in the database.
     * @param txid the transaction id
     */
    private void insertTxid(final RequestId txid) {
        logger.debug("Insert txid in transaction table: " + txid);

        PreparedStatement pstmt = null;
        Connection conn = null;


        try {
            conn = getConnection();
            pstmt = conn.prepareStatement("insert into ha_transactions (txid) values (?)");
            pstmt.setString(1, txid.toString());
            int count = pstmt.executeUpdate();
            if (count != 1) {
                logger.error("\tUnable to insert tx id: " + txid + " expected 1, received: " + count);
            }
        } catch (Exception e) {
            logger.error("\tUnable to insert tx id: " + txid, e);
        } finally {
            // Release DB connection
            try {
                if (pstmt != null) {
                    pstmt.close();
                }
                if (conn != null) {
                    conn.close();
                }
            } catch (Exception e) {
                // do nothing
            }
        }
    }

    /**
     * Access the database to check if the transaction have committed.
     * @param txid the transaction id
     * @return true if the transaction have committed
     */
    private boolean isCommitted(final RequestId txid) {
        logger.debug("Check for txid in transaction table: " + txid);

        boolean found = false;
        PreparedStatement pstmt = null;
        Connection conn = null;

        try {
            conn = getConnection();
            pstmt = conn.prepareStatement("select txid from ha_transactions where txid = ?");
            pstmt.setString(1, txid.toString());
            ResultSet rs = pstmt.executeQuery();
            if (rs.next()) {
                logger.debug("\tTransaction found");

                found = true;
            } else {
                logger.debug("\tTransaction not found");

                found = false;
            }
        } catch (Exception e) {
            // COMPLETE: Handle errors
            logger.error("\tUnable to obtain tx id: " + txid, e);
        } finally {
            // Release DB connection
            try {
                if (pstmt != null) {
                    pstmt.close();
                }
                if (conn != null) {
                    conn.close();
                }
            } catch (Exception e) {
                ;
            }
        }

        return found;
    }

    /**
     * Obtains a new connection.
     * @return the connection
     * @throws Exception
     */
    private Connection getConnection() throws Exception {
        Connection conn;
        conn = getDatasource().getConnection();
        if (logger.isDebugEnabled()) {
            logger.debug("Tx table connection, isolation level: "
                                + conn.getTransactionIsolation()
                                + " autocommit: " + conn.getAutoCommit());
        }
        return conn;
    }

    /**
     * Returns the tx table datasource.
     * @return the tx table datasource
     * @throws Exception
     */
    private DataSource getDatasource() throws Exception {
        if (ds == null) {
            try {
                Context ctx = new InitialContext();
                ds = (DataSource) ctx.lookup(txTableDatasource);
            } catch (NamingException e) {
                logger.error("Unable to find datasource", e);
                throw e;
            }
        }
        return ds;
    }

    /**
     * Closes and stops all the elements used (channel, dispatcher...).
     */
    public void clear() {
        messageMgr.clear();
        try {
            MBeanUtils.unRegisterXtraMBean(objectName, name);
        } catch (CMIMBeanConfigException e) {
            logger.warn("Unable to unregister the MBean for the HA service.", e);
        }

    }

    /**
     * @return the name of this bean
     */
    public String getName() {
        return name;
    }

    /**
     * @return Object Name
     */
    public String getobjectName() {
        return objectName.toString();
    }

    /**
     * Sets the object name of this MBean.
     * @param name the Object Name
     */
    public void setobjectName(final ObjectName name) {
        this.objectName = name;
    }


    /**
     * Get the number of replicated messages sent.
     * @return the number of replicated messages
     */
    public long getnumberofReplicatedMessages() {
        return messageMgr.getNumberofReplicatedMessages();
    }

    /**
     * Get the average size of the replicated messages sent.
     * @return the number of replicated messages
     */
    public double getavgSizeofReplicatedMessages() {
        return messageMgr.getAvgSizeofReplicatedMessages();
    }

    /**
     * Get the total size of the replicated messages sent.
     * @return the total size of the replicated messages
     */
    public double gettotSizeofReplicatedMessages() {
        return messageMgr.getTotSizeofReplicatedMessages();
    }

    /**
     * Get the JGroups configuration file name.
     * @return the JGroups configuration file name
     */
    public String getjgcsProperties() {
        return jgcsProperties;
    }

    /**
     * Get the SFSB info timeout to clean certain information stored in memory.
     * @return the timeout that
     */
    public long getinfoTimeout() {
        return timeout;
    }

    /**
     * Set the SFSB info timeout. The info stored in the node is removed when
     * the timer expires.
     * @param timeout the timeout to clean the info
     */
    public void setinfoTimeout(final long timeout) {
        this.timeout = (int) timeout;

        activeExecutionObjects.setTimeout(this.timeout);
        committingMessages.setTimeout(this.timeout);
        backupBeanInfo.setTimeout(this.timeout);
        backupRequestReponse.setTimeout(this.timeout);
    }

    /**
     * Get the Datasource name required to hold the EB version numbers of the
     * horizontal replication approach.
     * @return the Datasource name
     */
    public String getdatasourceName() {
        return txTableDatasource;
    }

    /**
     * Set the Datasource name required to hold the EB version numbers of the
     * horizontal replication approach.
     * @param name the name of the datasource
     */
    public void setdatasourceName(final String name) {
        this.txTableDatasource = name;
        // Force to obtain a new datasource
        this.ds = null;
    }

}

